// 实现语音识别子服务
#include <brpc/server.h>
#include <butil/logging.h>

#include "channel.hpp"
#include "etcd.hpp"   // 服务注册模块封装
#include "logger.hpp" // 日志模块封装
#include "mysql_chat_session_member.hpp"
#include "rabbitmq.hpp"
#include "utils.hpp"

#include "base.pb.h"      // protobuf框架代码
#include "transmite.pb.h" // protobuf框架代码
#include "user.pb.h"      // protobuf框架代码

/**
 * 示例消息格式：
 * message {
    message_id: "550e8400-e29b-41d4-a716-446655440000"
    chat_session_id: "chat_123"
    timestamp: 1698765432
    sender {
      user_id: "123"
      nickname: "Alice"
      avatar_id: "avatar_123"
    }
    message {
      text: "Hello, World!"
      image_url: ""
      file_url: ""
    }
  }
*/

namespace ken_im
{
class TransmiteServiceImpl : public ken_im::MsgTransmitService
{
  public:
    TransmiteServiceImpl(
        const std::string &user_service_name,
        const ServiceManager::ptr &channels,
        const std::shared_ptr<odb::core::database> &mysql_client,
        const std::string &exchange_name, const std::string &routing_key,
        const MQClient::ptr &mq_client )
        : _user_service_name( user_service_name ), _mm_channels( channels ),
          _mysql_session_member_table(
              std::make_shared<ChatSessionMemeberTable>( mysql_client ) ),
          _exchange_name( exchange_name ), _routing_key( routing_key ),
          _mq_client( mq_client )
    {
    }
    ~TransmiteServiceImpl()
    {
    }

    //  处理一个新的消息请求，组织消息内容，并将消息转发给目标用户
    //  用于处理消息的转发和持久化。它的主要功能是从客户端接收消息请求，组织消息内容，获取消息的转发目标列表，并将消续息发布到消息队列中，以便后进行持久化存储。
    void GetTransmitTarget( google::protobuf::RpcController *controller,
                            const ::ken_im::NewMessageReq *request,
                            ::ken_im::GetTransmitTargetRsp *response,
                            ::google::protobuf::Closure *done ) override
    {
        brpc::ClosureGuard rpc_guard( done );
        auto err_response = [this,
                             response]( const std::string &rid,
                                        const std::string &errmsg ) -> void
        {
            response->set_request_id( rid );
            response->set_success( false );
            response->set_errmsg( errmsg );
            return;
        };

        // 从请求中获取关键信息：用户ID，所属会话ID，消息内容
        std::string rid = request->request_id();
        std::string uid = request->user_id();
        std::string chat_ssid = request->chat_session_id();
        const MessageContent &content = request->message();

        // 进行消息组织：发送者-用户子服务获取信息，所属会话，消息内容，产生时间，消息ID
        auto channel = _mm_channels->choose( _user_service_name );
        if ( !channel )
        {
            LOG_ERROR( "{}-{} 没有可供访问的用户子服务节点！", rid,
                       _user_service_name );
            return err_response( rid, "没有可供访问的用户子服务节点！" );
        }
        UserService_Stub stub( channel.get() );
        GetUserInfoReq req;
        GetUserInfoRsp rsp;
        req.set_request_id( rid );
        req.set_user_id( uid );
        brpc::Controller cntl;

        stub.GetUserInfo( &cntl, &req, &rsp, nullptr );
        if ( cntl.Failed() == true || rsp.success() == false )
        {
            LOG_ERROR( "{} - 用户子服务调用失败：{}！", request->request_id(),
                       cntl.ErrorText() );
            return err_response( request->request_id(), "用户子服务调用失败!" );
        }

        MessageInfo message;
        message.set_message_id( uuid() );
        message.set_chat_session_id( chat_ssid );
        message.set_timestamp( time( nullptr ) );
        message.mutable_sender()->CopyFrom( rsp.user_info() );
        message.mutable_message()->CopyFrom( content );

        // 获取消息转发客户端用户列表
        auto target_list = _mysql_session_member_table->members( chat_ssid );

        // 将封装完毕的消息，发布到消息队列，待消息存储子服务进行消息持久化
        bool ret = _mq_client->publish(
            _exchange_name, message.SerializeAsString(), _routing_key );
        if ( ret == false )
        {
            LOG_ERROR( "{} - 持久化消息发布失败：{}！", request->request_id(),
                       cntl.ErrorText() );
            return err_response( request->request_id(),
                                 "持久化消息发布失败：!" );
        }

        // 组织响应
        response->set_request_id( rid );
        response->set_success( true );
        response->mutable_message()->CopyFrom( message );
        for ( const auto &id : target_list )
        {
            response->add_target_id_list( id );
        }
    }

  private:
    // 用户子服务调用相关信息
    std::string _user_service_name;
    ServiceManager::ptr _mm_channels;

    // 聊天会话成员表的操作句柄
    ChatSessionMemeberTable::ptr _mysql_session_member_table;

    // 消息队列客户端句柄
    std::string _exchange_name;
    std::string _routing_key;
    MQClient::ptr _mq_client;
};

class TransmiteServer
{
  public:
    using ptr = std::shared_ptr<TransmiteServer>;
    TransmiteServer( const std::shared_ptr<odb::core::database> &mysql_client,
                     const Discovery::ptr discovery_client,
                     const Registry::ptr &registry_client,
                     const std::shared_ptr<brpc::Server> &server )
        : _service_discoverer( discovery_client ),
          _registry_client( registry_client ), _mysql_client( mysql_client ),
          _rpc_server( server )
    {
    }
    ~TransmiteServer()
    {
    }
    // 搭建RPC服务器，并启动服务器
    void start()
    {
        _rpc_server->RunUntilAskedToQuit();
    }

  private:
    Discovery::ptr _service_discoverer;                 // 服务发现客户端
    Registry::ptr _registry_client;                     // 服务注册客户端
    std::shared_ptr<odb::core::database> _mysql_client; // mysql数据库客户端
    std::shared_ptr<brpc::Server> _rpc_server;
};

class TransmiteServerBuilder
{
  public:
    // 构造mysql客户端对象
    void make_mysql_object( const std::string &user, const std::string &pswd,
                            const std::string &host, const std::string &db,
                            const std::string &cset, int port,
                            int conn_pool_count )
    {
        _mysql_client = ODBFactory::create( user, pswd, host, db, cset, port,
                                            conn_pool_count );
    }
    // 用于构造服务发现客户端&信道管理对象
    void make_discovery_object( const std::string &reg_host,
                                const std::string &base_service_name,
                                const std::string &user_service_name )
    {
        _user_service_name = user_service_name;
        _mm_channels = std::make_shared<ServiceManager>();
        _mm_channels->declared( user_service_name );
        LOG_DEBUG( "设置用户子服务为需添加管理的子服务：{}",
                   user_service_name );
        auto put_cb =
            std::bind( &ServiceManager::onServiceOnline, _mm_channels.get(),
                       std::placeholders::_1, std::placeholders::_2 );
        auto del_cb =
            std::bind( &ServiceManager::onServiceOffline, _mm_channels.get(),
                       std::placeholders::_1, std::placeholders::_2 );
        _service_discoverer = std::make_shared<Discovery>(
            reg_host, base_service_name, put_cb, del_cb );
    }
    // 用于构造服务注册客户端对象
    void make_registry_object( const std::string &reg_host,
                               const std::string &service_name,
                               const std::string &access_host )
    {
        _registry_client = std::make_shared<Registry>( reg_host );
        _registry_client->registry( service_name, access_host );
    }
    // 用于构造rabbitmq客户端对象
    void make_mq_object( const std::string &user, const std::string &passwd,
                         const std::string &host,
                         const std::string &exchange_name,
                         const std::string &queue_name,
                         const std::string &binding_key )
    {
        _routing_key = binding_key;
        _exchange_name = exchange_name;
        _mq_client = std::make_shared<MQClient>( user, passwd, host );
        _mq_client->declareComponents( exchange_name, queue_name, binding_key );
    }
    // 构造RPC服务器对象
    void make_rpc_server( uint16_t port, int32_t timeout, uint8_t num_threads )
    {
        if ( !_mq_client )
        {
            LOG_ERROR( "还未初始化消息队列客户端模块！" );
            abort();
        }
        if ( !_mm_channels )
        {
            LOG_ERROR( "还未初始化信道管理模块！" );
            abort();
        }
        if ( !_mysql_client )
        {
            LOG_ERROR( "还未初始化Mysql数据库模块！" );
            abort();
        }

        _rpc_server = std::make_shared<brpc::Server>();

        TransmiteServiceImpl *transmite_service = new TransmiteServiceImpl(
            _user_service_name, _mm_channels, _mysql_client, _exchange_name,
            _routing_key, _mq_client );

        int ret = _rpc_server->AddService(
            transmite_service, brpc::ServiceOwnership::SERVER_OWNS_SERVICE );
        if ( ret == -1 )
        {
            LOG_ERROR( "添加Rpc服务失败！" );
            abort();
        }
        brpc::ServerOptions options;
        options.idle_timeout_sec = timeout;
        options.num_threads = num_threads;
        ret = _rpc_server->Start( port, &options );
        if ( ret == -1 )
        {
            LOG_ERROR( "服务启动失败！" );
            abort();
        }
    }
    TransmiteServer::ptr build()
    {
        if ( !_service_discoverer )
        {
            LOG_ERROR( "还未初始化服务发现模块！" );
            abort();
        }
        if ( !_registry_client )
        {
            LOG_ERROR( "还未初始化服务注册模块！" );
            abort();
        }
        if ( !_rpc_server )
        {
            LOG_ERROR( "还未初始化RPC服务器模块！" );
            abort();
        }
        TransmiteServer::ptr server = std::make_shared<TransmiteServer>(
            _mysql_client, _service_discoverer, _registry_client, _rpc_server );
        return server;
    }

  private:
    std::string _user_service_name;
    ServiceManager::ptr _mm_channels;
    Discovery::ptr _service_discoverer;

    std::string _routing_key;
    std::string _exchange_name;
    MQClient::ptr _mq_client;

    Registry::ptr _registry_client;                     // 服务注册客户端
    std::shared_ptr<odb::core::database> _mysql_client; // mysql数据库客户端
    std::shared_ptr<brpc::Server> _rpc_server;
};
} // namespace ken_im